package org.codehaus.activemq.broker.impl;

import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.jms.InvalidClientIDException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.transaction.xa.XAException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.broker.Broker;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.broker.BrokerConnector;
import org.codehaus.activemq.broker.BrokerContainer;
import org.codehaus.activemq.capacity.CapacityMonitorEvent;
import org.codehaus.activemq.capacity.CapacityMonitorEventListener;
import org.codehaus.activemq.message.ActiveMQDestination;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ActiveMQXid;
import org.codehaus.activemq.message.ConnectionInfo;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.DurableUnsubscribe;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.message.ProducerInfo;
import org.codehaus.activemq.message.SessionInfo;
import org.codehaus.activemq.service.Service;
import org.codehaus.activemq.store.PersistenceAdapter;

public class BrokerContainerImpl
  implements BrokerContainer, CapacityMonitorEventListener
{
  private static final Log log = LogFactory.getLog(BrokerContainerImpl.class);
  private Broker broker;
  private Map clientIds;
  private Map consumerInfos;
  private Map producerInfos;
  private List connectors;
  private Thread shutdownHook;
  private boolean stopped;

  public BrokerContainerImpl(String brokerName)
  {
    this(new DefaultBroker(brokerName));
  }

  public BrokerContainerImpl(String brokerName, PersistenceAdapter persistenceAdapter) {
    this(new DefaultBroker(brokerName, persistenceAdapter));
  }

  public BrokerContainerImpl(Broker broker)
  {
    this.broker = broker;
    this.clientIds = new ConcurrentHashMap();
    this.consumerInfos = new ConcurrentHashMap();
    this.producerInfos = new ConcurrentHashMap();
    this.connectors = new CopyOnWriteArrayList();
    this.broker.addCapacityEventListener(this);
  }

  public List getConnectors() {
    return this.connectors;
  }

  public void setConnectors(List connectors) {
    this.connectors = connectors;
  }

  public Broker getBroker()
  {
    return this.broker;
  }

  public PersistenceAdapter getPersistenceAdapter() {
    return this.broker != null ? this.broker.getPersistenceAdapter() : null;
  }

  public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) {
    if (this.broker == null) {
      throw new IllegalStateException("Cannot set this property as we don't have a broker yet");
    }
    this.broker.setPersistenceAdapter(persistenceAdapter);
  }

  public void start()
    throws JMSException
  {
    log.info("ActiveMQ JMS Message Broker is starting");
    log.info("For help or more information please see: http://activemq.codehaus.org/");
    this.broker.start();
    addShutdownHook();
    log.info("ActiveMQ JMS Message Broker has started");
  }

  protected void addShutdownHook() {
    this.shutdownHook = new Thread() {
      public void run() {
        BrokerContainerImpl.this.containerShutdown();
      }
    };
    Runtime.getRuntime().addShutdownHook(this.shutdownHook);
  }

  public synchronized void stop()
    throws JMSException
  {
    if (!this.stopped) {
      log.info("ActiveMQ Message Broker is shutting down");
      try
      {
        Runtime.getRuntime().removeShutdownHook(this.shutdownHook);
      }
      catch (Exception e)
      {
      }

      JMSException firstException = null;

      for (Iterator iter = new ArrayList(this.connectors).iterator(); iter.hasNext(); ) {
        Service connector = (Service)iter.next();
        try {
          connector.stop();
        }
        catch (JMSException e) {
          if (firstException == null) {
            firstException = e;
          }
          log.warn("Could not close connector: " + connector + " due to: " + e, e);
        }
      }
      this.connectors.clear();

      for (Iterator iter = this.clientIds.values().iterator(); iter.hasNext(); )
      {
        BrokerClient client = (BrokerClient)iter.next();
        try {
          client.stop();
        }
        catch (JMSException e) {
          if (firstException == null) {
            firstException = e;
          }
          log.warn("Could not close client: " + client + " due to: " + e, e);
        }
      }
      this.clientIds.clear();

      this.broker.removeCapacityEventListener(this);
      this.broker.stop();

      log.info("ActiveMQ JMS Message Broker stopped");

      this.stopped = true;
      if (firstException != null)
        throw firstException;
    }
  }

  public void registerConnection(BrokerClient client, ConnectionInfo info)
    throws InvalidClientIDException
  {
    String clientId = info.getClientId();
    if (this.clientIds.containsKey(clientId)) {
      throw new InvalidClientIDException("Duplicate clientId: " + info);
    }
    log.info("Adding new client: " + clientId + " on transport: " + client.getChannel());
    this.clientIds.put(clientId, client);
  }

  public void deregisterConnection(BrokerClient client, ConnectionInfo info)
    throws JMSException
  {
    String clientId = client.getClientID();
    if (clientId != null) {
      Object answer = this.clientIds.remove(clientId);
      if (answer != null) {
        log.info("Removing client: " + clientId + " on transport: " + client.getChannel());
        getBroker().cleanUpClient(client);
      }
      else {
        log.warn("Got duplicate deregisterConnection for client: " + clientId);
      }
    }
    else {
      log.warn("No clientID available for client: " + client);
    }
  }

  public void registerMessageConsumer(BrokerClient client, ConsumerInfo info)
    throws JMSException
  {
    this.consumerInfos.put(info, client);
    getBroker().addMessageConsumer(client, info);
  }

  public void deregisterMessageConsumer(BrokerClient client, ConsumerInfo info)
    throws JMSException
  {
    this.consumerInfos.remove(info);
    getBroker().removeMessageConsumer(client, info);
  }

  public void registerMessageProducer(BrokerClient client, ProducerInfo info)
    throws JMSException
  {
    ActiveMQDestination dest = info.getDestination();
    if ((dest != null) && (dest.isTemporary()))
    {
      String clientId = ActiveMQDestination.getClientId(dest);
      if (clientId == null) {
        throw new InvalidDestinationException("Destination " + dest.getPhysicalName() + " is a temporary destination with null clientId");
      }

      if (!this.clientIds.containsKey(clientId)) {
        throw new InvalidDestinationException("Destination " + dest.getPhysicalName() + " is no longer valid because the client " + clientId + " no longer exists");
      }
    }

    this.producerInfos.put(info, client);
  }

  public void deregisterMessageProducer(BrokerClient client, ProducerInfo info)
    throws JMSException
  {
    this.producerInfos.remove(info);
  }

  public void registerSession(BrokerClient client, SessionInfo info)
    throws JMSException
  {
  }

  public void deregisterSession(BrokerClient client, SessionInfo info)
    throws JMSException
  {
  }

  public void startTransaction(BrokerClient client, String transactionId)
    throws JMSException
  {
    getBroker().startTransaction(client, transactionId);
  }

  public void rollbackTransaction(BrokerClient client, String transactionId)
    throws JMSException
  {
    getBroker().rollbackTransaction(client, transactionId);
  }

  public void commitTransaction(BrokerClient client, String transactionId)
    throws JMSException
  {
    getBroker().commitTransaction(client, transactionId);
  }

  public void sendTransactedMessage(BrokerClient client, String transactionId, ActiveMQMessage message)
    throws JMSException
  {
    getBroker().sendTransactedMessage(client, transactionId, message);
  }

  public void acknowledgeTransactedMessage(BrokerClient client, String transactionId, MessageAck ack)
    throws JMSException
  {
    getBroker().acknowledgeTransactedMessage(client, transactionId, ack);
  }

  public void sendMessage(BrokerClient client, ActiveMQMessage message)
    throws JMSException
  {
    getBroker().sendMessage(client, message);
  }

  public void acknowledgeMessage(BrokerClient client, MessageAck ack)
    throws JMSException
  {
    getBroker().acknowledgeMessage(client, ack);
  }

  public void durableUnsubscribe(BrokerClient client, DurableUnsubscribe ds)
    throws JMSException
  {
    getBroker().deleteSubscription(ds.getClientId(), ds.getSubscriberName());
  }

  public void startTransaction(BrokerClient client, ActiveMQXid xid)
    throws XAException
  {
    getBroker().startTransaction(client, xid);
  }

  public ActiveMQXid[] getPreparedTransactions(BrokerClient client)
    throws XAException
  {
    return getBroker().getPreparedTransactions(client);
  }

  public int prepareTransaction(BrokerClient client, ActiveMQXid xid)
    throws XAException
  {
    return getBroker().prepareTransaction(client, xid);
  }

  public void rollbackTransaction(BrokerClient client, ActiveMQXid xid)
    throws XAException
  {
    getBroker().rollbackTransaction(client, xid);
  }

  public void commitTransaction(BrokerClient client, ActiveMQXid xid, boolean onePhase)
    throws XAException
  {
    getBroker().commitTransaction(client, xid, onePhase);
  }

  public void addConnector(BrokerConnector connector) {
    this.connectors.add(connector);
  }

  public void removeConnector(BrokerConnector connector) {
    this.connectors.remove(connector);
  }

  public void capacityChanged(CapacityMonitorEvent event)
  {
    for (Iterator i = this.producerInfos.values().iterator(); i.hasNext(); ) {
      BrokerClient client = (BrokerClient)i.next();
      client.updateBrokerCapacity(event.getCapacity());
    }
  }

  protected void containerShutdown()
  {
    try
    {
      stop();
    }
    catch (JMSException e) {
      Exception linkedException = e.getLinkedException();
      if (linkedException != null) {
        log.error("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException);
      }
      else
        log.error("Failed to shut down: " + e, e);
    }
    catch (Exception e)
    {
      log.error("Failed to shut down: " + e, e);
    }
  }
}